/*
Copyright 2025 The KubeEdge Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package downstream

import (
	"context"
	"errors"
	"fmt"

	"github.com/go-logr/logr"

	operationsv1alpha2 "github.com/kubeedge/api/apis/operations/v1alpha2"
	"github.com/kubeedge/kubeedge/cloud/pkg/cloudhub"
	"github.com/kubeedge/kubeedge/cloud/pkg/common/nodes"
	"github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/executor"
	"github.com/kubeedge/kubeedge/cloud/pkg/taskmanager/wrap"
)

type DownstreamHandler interface {
	// Logger returns the downstream handler logger.
	Logger() logr.Logger

	// CanDownstreamPhase returns whether node tasks can be performed during the node job phase.
	CanDownstreamPhase(job any) bool

	// ExecutorChan returns the channel of the node job. The channel data is generated by NodeJobEventHandler
	ExecutorChan() chan wrap.NodeJob

	// InterruptExecutor interrupts downstream executor if it is running.
	// When a node job CR is deleted, it is necessary to determine whether a node job's executor exists.
	// If exists, it means that there are node tasks being executed and this executor needs to be interrupted.
	InterruptExecutor(job any)

	// UpdateNodeTaskStatus updates the status of the node task.
	UpdateNodeTaskStatus(ctx context.Context, job wrap.NodeJob, task wrap.NodeJobTask)
}

const (
	// downstreamChanSize is the limit size of the downstream channel.
	downstreamChanSize = 10
)

var (
	// downstreamHandlers is the map of downstream handlers.
	downstreamHandlers = make(map[string]DownstreamHandler)
)

// Init registers the downstream handlers.
func Init(ctx context.Context) error {
	nodeUpgradeJobHandler, err := newNodeUpgradeJobHandler(ctx)
	if err != nil {
		return fmt.Errorf("failed to create node upgrade job downstream handler, err: %v", err)
	}
	downstreamHandlers[operationsv1alpha2.ResourceNodeUpgradeJob] = nodeUpgradeJobHandler

	imagePrePullJobHandler, err := newImagePrepullJobHandler(ctx)
	if err != nil {
		return fmt.Errorf("failed to create image pre pull job downstream handler, err: %v", err)
	}
	downstreamHandlers[operationsv1alpha2.ResourceImagePrePullJob] = imagePrePullJobHandler

	configUpdateJobHandler, err := newConfigUpdateJobHandler(ctx)
	if err != nil {
		return fmt.Errorf("failed to create configupdate job downstream handler, err: %v", err)
	}
	downstreamHandlers[operationsv1alpha2.ResourceConfigUpdateJob] = configUpdateJobHandler
	return nil
}

// MustGetHandlerWithObj returns the downstream handler for the given obj type.
func MustGetHandlerWithObj(obj any) (DownstreamHandler, error) {
	var downstreamHandler DownstreamHandler
	switch obj.(type) {
	case *operationsv1alpha2.NodeUpgradeJob:
		downstreamHandler = downstreamHandlers[operationsv1alpha2.ResourceNodeUpgradeJob]
	case *operationsv1alpha2.ImagePrePullJob:
		downstreamHandler = downstreamHandlers[operationsv1alpha2.ResourceImagePrePullJob]
	case *operationsv1alpha2.ConfigUpdateJob:
		downstreamHandler = downstreamHandlers[operationsv1alpha2.ResourceConfigUpdateJob]
	default:
		return nil, fmt.Errorf("invalid node job type %T", obj)
	}
	if downstreamHandler == nil {
		return nil, errors.New("downstream handler must be not nil")
	}
	return downstreamHandler, nil
}

// Start starts the downstream handlers.
func Start(ctx context.Context) {
	for _, handler := range downstreamHandlers {
		go watchJobDownstream(ctx, handler)
	}
}

// watchJobDownstream watches the downstream channel and executes the node tasks.
func watchJobDownstream(ctx context.Context, handler DownstreamHandler) {
	logger := handler.Logger()
	for {
		select {
		case <-ctx.Done():
			return
		case obj := <-handler.ExecutorChan():
			exec, loaded, err := executor.NewNodeTaskExecutor(ctx, obj, handler.UpdateNodeTaskStatus)
			if err != nil {
				logger.Error(err, "failed to create node task executor",
					"type", obj.ResourceType(), "jobname", obj.Name())
				continue
			}
			if loaded {
				logger.V(1).Info("node task executor is already running, ignore it",
					"type", obj.ResourceType(), "jobname", obj.Name())
				continue
			}
			sm, err := cloudhub.GetSessionManager()
			if err != nil {
				logger.Error(err, "failed to get session manager")
				continue
			}
			logger.V(1).Info("execute the node task", "type", obj.ResourceType(), "jobname", obj.Name())
			go exec.Execute(ctx, nodes.GetManagedEdgeNodes(&sm.NodeSessions))
		}
	}
}
